выполнил: Морозов Е.А.
Необходимо осуществить анализ логов, создав скрипт для формирования витрины на основе логов web-сайта. Разработаный скрипт витрины данных должен иметь следующее содержание и отображать следующие данные:
Источник данных (логи web-сайта): https://disk.yandex.ru/d/__
Выполнение разбито на стадии в рамках ETL-процесса: extract, transform и load.
Загрузка данных.
Извлечение данных.
Состоит в обработке файла логов с помощью методов ЯП *python* для извлечения структурированной информации.
Преобразование данных.
Состоит в аггрегировании и трансформации сведенных в табличную форму данных выбранными для этого инструментами/в соотв. библиотеках (Базы Данных, табличные прцессоры Pandas/Dusk/SPARK).
Позволит найти ответы на вопросы задания, группируя и сортируя данные подходящим способом.
Выгрузка результатов.
Сформированные в результате работы скрипта витрины данных сохраняются в подходящем для дальнейшего применения виде.
Выводы об эффективности процесса.
Код:
# организация хранения данных проекта
print("В рабочей директории создана папка 'morozov_ea' для организации хранения данных проекта.\n")
! ls -lah ..
Вывод:
В рабочей директории создана папка 'morozov_ea' для организации хранения данных проекта.
total 48K
drwxr-x--- 9 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 22 18:33 .
drwxr-xr-x 31 root root 4.0K Dec 29 14:51 ..
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 220 Feb 25 2020 .bash_logout
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 3.7K Feb 25 2020 .bashrc
drwxr-xr-x 5 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 22 18:34 .cache
drwxrwsr-x 3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 22 18:33 .conda
lrwxrwxrwx 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 55 Dec 16 11:34 dags -> /root/data-analysis/airflow/dags/jupyter-morozov_evgeny
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 16 22:19 .ipynb_checkpoints
drwxr-xr-x 3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 16 22:25 .ipython
drwxr-xr-x 3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 21 21:52 .jupyter
drwxr-xr-x 6 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 22 18:34 .local
drwxr-xr-x 6 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 9 00:12 morozov_ea
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 807 Feb 25 2020 .profile
Код:
# структура проекта и файлы
print("Папка проекта содержит:\n - папку",
"\033[1m" + "'input_data'" + "\033[0m",
"с исходными данными задания,\n - папку",
"\033[1m" + "'output_data'" + "\033[0m",
"с файлами, получаемыми в ходе работы и считающимися итоговыми результатами,\n - папку",
"\033[1m" + "'spark-warehouse'" + "\033[0m",
", создаваемую при работе фреймворка SPARK (подробнее в соотв. разделе исследования).\n")
!ls -lh
Вывод:
Папка проекта содержит:
- папку 'input_data' с исходными данными задания,
- папку 'output_data' с файлами, получаемыми в ходе работы и считающимися итоговыми результатами,
- папку 'spark-warehouse' , создаваемую при работе фреймворка SPARK (подробнее в соотв. разделе исследования).
total 264K
drwxr-xr-x 3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 7 23:10 input_data
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 251K Jan 9 00:12 morozov_ea_final_03.ipynb
drwxr-xr-x 5 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 8 23:06 output_data
drwxr-xr-x 3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 9 00:18 spark-warehouse
Код:
print("рабочие файлы в 'input_data':")
! ls -lh ./input_data/
Вывод:
рабочие файлы в 'input_data':
total 3.4G
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 3.3G Jan 7 23:07 access-Copy1.log
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 64M Jan 7 23:07 access-Copy1_short.log
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 13M Dec 17 14:05 client_hostname.csv
# импортируем библиотеки:
import datetime
from datetime import datetime
import numpy as np
import pandas as pd
from random import sample
import re
from tqdm import tqdm
# импорты модулей Spark:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import pyspark.pandas as ps
Код:
# получим объект из файла
#test_file = open("../morozov_ea/input_data/access-Copy1_short.log", "r") # обрезанная версия лога для тестов
test_file = open("../morozov_ea/input_data/access-Copy1.log", "r")
# считываем все строки
rows = test_file.readlines()
# закрываем файл
test_file.close
print(f'Количество строк в файле логов = {len(rows):_} \n'.replace("_", " "))
print('Образец данных:\n')
for row in sample(rows, 3):
print(row)
Вывод:
Количество строк в файле логов = 10 365 152
Образец данных:
5.127.49.29 - - [22/Jan/2019:16:19:52 +0330] "GET /image/43688/productModel/200x200 HTTP/1.1" 200 4662 "https://www.zanbil.ir/m/filter/p49?page=1" "Mozilla/5.0 (Linux; Android 5.1.1; SAMSUNG SM-J320F Build/LMY47V) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/7.4 Chrome/59.0.3071.125 Mobile Safari/537.36" "-"
185.240.149.33 - - [25/Jan/2019:21:28:06 +0330] "GET /static/images/next.png HTTP/1.1" 200 3045 "https://znbl.ir/static/bundle-bundle_site_head.css" "Mozilla/5.0 (Windows NT 6.1; rv:64.0) Gecko/20100101 Firefox/64.0" "-"
Входные данные довольно "грязные" и слабоструктурированные. Практически во всех записях присутствует информация о:
Потребуется большой объём предобработки.
Хотя файл логов занимает ощутимый объём (3.5 гБ слабоструктурироавнных записей), с помощью серверной платформы 1Т оказалось вполне возможным обработать записи за 1 проход. Разбиение на батчи и последовательная обработка по частям, к счастью, не потребовались, что позволило несколько сократить объём кода.
Разделим строки записей по блокам, создав для этого список значений. Из смысловых блоков последовательно извлечём нужные данные и сохраним их в подходящем для дальнейшего анализа и преобразований виде.
Этапы преобразований вынесены в отдельные функции.
Сокращённый код:
# создаём функцию для первичной очистки строк лога от лишних данных
def cleaning(rows):
new_rows = [] # в ходе очистки строк лога будем наполнять новый список новыми записями
for element in tqdm(rows, desc='Splitting the rows of data', colour='green'):
element = re.split('- -|"-"|"', element.strip()) #split с использованием regular_expressions
. . . . . . . . . .
rows.clear()
return new_rows #функция возвращает список строк, строки разделёны на блоки по разделителям
# по каждой строке-объекту получим список признаков, при дальнейшей обработке сократим количество элементов
# до необходимого кол-ва, а также обработаем сами значения
raw_data_all_features = cleaning(rows_sample)
Вывод:
Splitting the rows of data: 100%|██████████| 10365152/10365152 [01:22<00:00, 125357.62it/s]
Deleting empties: 100%|██████████| 10365152/10365152 [00:09<00:00, 1123350.02it/s]
Сокращённый код:
# создаём функцию для удаления малоинформативных записей и получаем очищенный список на выходе.
def cleaning_uninf(rows):
drop_that_rows = [] # пустой список для индексов строк годных для удаления
for i in tqdm(range(len(rows)), desc='Looking data for uninformative rows', colour='green'):
if len(rows[i]) < 5: # принятый критерий отсеивания строк
drop_that_rows.append(i)
. . . . . . . . . .
return rows_without_dropped
raw_data_all_features = cleaning_uninf(raw_data_all_features)
Вывод:
Looking data for uninformative rows: 100%|██████████| 10365152/10365152 [00:02<00:00, 4140638.28it/s]
Deleting uninformative rows: 100%|██████████| 10365152/10365152 [16:05<00:00, 10736.63it/s]
Малоинформативными являются 14353 строки.
В полученном списке 10350799 строк
Код:
# получим сокращённый список значений (4 признака вместо 6) для парсинга в более понятном виде
raw_data_4f = []
for k in tqdm(range(len(raw_data_all_features))
, desc='Creating a list with working data (4 columns istead of 6)'
, colour='green'):
raw_data_4f.append([
raw_data_all_features[k][0],
raw_data_all_features[k][1],
raw_data_all_features[k][3],
raw_data_all_features[k][-1]
])
raw_data_all_features.clear() # очищаем список raw_data_all_features
Вывод:
Creating a list with working data (4 columns istead of 6): 100%|██████████| 10350799/10350799 [00:21<00:00, 480765.46it/s]
Сокращённый код:
# функция для обработки значений столбца с датами
def date_transformer(dates):
dates_list = []
for date in tqdm(dates # с пом. reg.expr отделим даты от текста по шаблону
, desc='dates extraction'
, colour='green'):
dates_list.append(re.findall("[^'[][\w\/:]*[^\+0330\]']", date[1]))
. . . . . . . . . .
return dates_list # функция возвращает список с преобразованными датами
# получаем список с преобразованными датами
dates_list = date_transformer(raw_data_4f)
Вывод:
dates extraction: 100%|██████████| 10350799/10350799 [00:22<00:00, 464198.05it/s]
converting dates to datetime-format: 100%|██████████| 10350799/10350799 [01:36<00:00, 106926.65it/s]
Сокращённый код:
# функция для обработки значений столбца кодами состояния HTTP
def serv_answers(answers):
serv_answers_list = []
for answer in tqdm(answers
. . . . . . . . . .
return serv_answers_list # функция возвращает список с кодами ответов сервера
# получаем список с преобразованными кодами состояния HTTP
serv_answers_list = serv_answers(raw_data_4f)
Вывод:
server's answers converting: 100%|██████████| 10350799/10350799 [00:14<00:00, 702469.67it/s]
Сокращённый код:
# функция для предв. обработки значений столбца User_Agent
def dev_browsers(lines):
dev_browsers_list = []
for line in tqdm(lines
. . . . . . . . . .
dev_browsers_list.append([*re.findall("\(([\w\s.;:\-\/+@]+)\)", line[-1]),
*re.findall("\w\) {1}([\w\/\s.]+$)", line[-1])])
return dev_browsers_list # функция возвращает список с соотв. значениями User_Agent
# получаем список с предварительно разделёнными на 2 групппы значениями User_Agent
# пригодный для выделения искомых данных об устройстве, ОС, браузере
dev_browsers_list = dev_browsers(raw_data_4f)
Вывод:
User_Agent info splitting into device+browser info: 100%|██████████| 10350799/10350799 [00:48<00:00, 214572.21it/s]
Сокращённый код:
# функция обработки данных из User_Agent для выделения инф. об устройстве и ОС
def os_dev(raw_list):
dev_browsers_list_os = [] # подготовим пустой список для сбора в него записей
for el in tqdm(raw_list, desc='creating a list of devices + OS', colour='green'):
if el != []:
. . . . . . . . . .
return os_dev_list_changed # функция возвращает список с инф. об устройстве и ОС
# функции обработки данных из User_Agent для выделения инф. о браузере
def browsers(raw_list):
dev_browsers_list_br = [] # подготовим пустой список для сбора в него записей
for el in tqdm(dev_browsers_list, desc='creating a list of browsers', colour='green'):
if len(el) == 2:
. . . . . . . . . .
return browsers_list_changed # функция возвращает список с инф. о браузерах
# получим списки с информацией из User_Agent в отдельные списки заносим данные об устройствах+ОС и о браузерах
os_dev_list = os_dev(dev_browsers_list)
print('Длина списка os_dev_list (ОС+устройство) =', len(os_dev_list), 'строк.')
browsers_list = browsers(dev_browsers_list)
print('Длина списка browsers_list (используемый браузер) =', len(browsers_list), 'строк.')
# функции для отдельной сведения информации из списков с данными об устройствах/ОС/браузерах в единый список
def dev_os_br_union(os_dev_list, dev_browsers_list):
united_list = [] # подготовим пустой список для сбора записей
# проходим в цикле по всем элементам списков, созданных выше (os_dev_list, browsers_list)
# и заносим значия в итоговый чтобы 1 устройству соответствовал 1 браузер исходя из простой логики
for i in tqdm(range(len(os_dev_list)), desc='creating of united device/OS/browser list', colour='green'):
# ветвления и логика:
if len(dev_browsers_list[i]) > 1:
. . . . . . . . . .
return united_list # функция возвращает список с общей информацией
# получим список с информацией об устройствах+ОС и о браузерах в одной записи
united_usag_list = dev_os_br_union(os_dev_list, browsers_list)
Вывод:
CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.48 µs
creating a list of devices + OS: 100%|██████████| 10350799/10350799 [00:02<00:00, 3731730.27it/s]
parsing of raw devices_&_OS list: 100%|██████████| 10350799/10350799 [00:18<00:00, 555351.15it/s]
secondary parsing of devices_&_OS list: 100%|██████████| 10350799/10350799 [00:06<00:00, 1700363.94it/s]
Длина списка os_dev_list (ОС+устройство) = 10350799 строк.
creating a list of browsers: 100%|██████████| 10350799/10350799 [00:02<00:00, 3545831.72it/s]
parsing of raw browsers list: 100%|██████████| 10350799/10350799 [00:34<00:00, 303134.47it/s]
secondary parsing of browsers list: 100%|██████████| 10350799/10350799 [00:08<00:00, 1180266.43it/s]
Длина списка browsers_list (используемый браузер) = 10350799 строк.
creating of united device/OS/browser list: 100%|██████████| 10350799/10350799 [00:17<00:00, 585242.45it/s]
Предварительная обработка закончена. Далее подготовим и сведём всю иформацию в датафрейм Pandas.
обработаем данные непосредственно методами самой библиотеки Pandas,
транслируем методы Pandas во фреймворк Spark,
Spark предлагает лаконичную форму работы как с Hive QL (т.е. БД будет возможно хранить в распределённом виде в хранилище Hive), так и простоту подключения к БД PostgreSQL, что должно сократить путь до выгрузки итоговой витрины.
# интересующие нас значения хранятся в списках, составим из них словарь:
# список с IP-адресами предполагается использовать в качестве суррогатного ключа,
# сделаем его значения более похожими на ключи - преобразуем адрес и одно число:
ip_addr_list = [('').join(el[0].split('.')).strip() for el in tqdm(raw_data_4f
, desc = 'collecting from ip_addr_list'
, colour = 'YELLOW')]
# некоторая обработка потребуется и для значений списков, хранящих по несколько элементов
# в строках в виде вложенных списков, получим из них отдельные столбцы значений:
date = [el[0] for el in tqdm(dates_list
, desc = 'collecting from date_list'
, colour = 'YELLOW')]
. . . . . . . . . .
, colour = 'YELLOW')]
browser = [el[-1] for el in tqdm(united_usag_list
, desc = 'collecting from browser_list'
, colour = 'YELLOW')]
# сводим всё в словарь для создания датафрейма Pandas
data_dict = {
'addr' : ip_addr_list,
'date' : date,
'serv_answer_1' : serv_answer_1,
'serv_answer_2' : serv_answer_2,
'device' : device,
'os' : os,
'browser' : browser
}
# создаём dataframe
df = pd.DataFrame(data = data_dict)
# исправим форматы данных на числовые для колонок 'addr', 'serv_answer_1', 'serv_answer_2'
for i in tqdm([0, 2, 3], desc = 'converting data formats', colour = 'green'):
df[df.columns[i]] = pd.to_numeric(df[df.columns[i]], errors='raise', downcast=None)
#df[df.columns[i]] = df[df.columns[i]].astype('int') - либо так
# выведем информацию о таблице и первые 5 строк полученного датафрейма:
print('Описание данных и первые 5 строк полученного датафрейма:\n')
df.info()
df.head()
collecting from ip_addr_list: 100%|██████████| 10350799/10350799 [00:04<00:00, 2207416.30it/s] collecting from date_list: 100%|██████████| 10350799/10350799 [00:02<00:00, 5087080.65it/s] collecting from serv_answer_1_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5187492.88it/s] collecting from serv_answer_2_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5322285.32it/s] collecting from device_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5620854.07it/s] collecting from os_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5985918.96it/s] collecting from browser_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5276965.55it/s] converting data formats: 100%|██████████| 3/3 [00:17<00:00, 5.75s/it]
Описание данных и первые 5 строк полученного датафрейма: <class 'pandas.core.frame.DataFrame'> RangeIndex: 10350799 entries, 0 to 10350798 Data columns (total 7 columns): # Column Dtype --- ------ ----- 0 addr int64 1 date datetime64[ns] 2 serv_answer_1 int64 3 serv_answer_2 int64 4 device object 5 os object 6 browser object dtypes: datetime64[ns](1), int64(3), object(3) memory usage: 552.8+ MB
| addr | date | serv_answer_1 | serv_answer_2 | device | os | browser | |
|---|---|---|---|---|---|---|---|
| 0 | 543614941 | 2019-01-22 03:56:14 | 200 | 30577 | parse_bot | parse_bot | parse_bot |
| 1 | 31569651 | 2019-01-22 03:56:16 | 200 | 5667 | ale-l21 | android | Chrome |
| 2 | 31569651 | 2019-01-22 03:56:16 | 200 | 5379 | ale-l21 | android | Chrome |
| 3 | 4077167129 | 2019-01-22 03:56:17 | 200 | 1696 | parse_bot | parse_bot | parse_bot |
| 4 | 91997215 | 2019-01-22 03:56:17 | 200 | 41483 | x64 | windows | parse_bot |
# получим индекс по уникальным сочетаниям полей 'device' и 'os' для чего сгруппируем
# данные и сбросим индекс, чтобы получить столбец с ключами из значений индекса pd.Series
index = (
df
.groupby(by = ['device', 'os'])['addr']
.count()
.sort_index(level = ['device', 'os'])
.reset_index()
.reset_index()
.drop(columns = 'addr')
)
index.tail(5)
| index | device | os | |
|---|---|---|---|
| 3376 | 3376 | zte grand s ii lte | android |
| 3377 | 3377 | zuk z1 | android |
| 3378 | 3378 | zuk z2121 | android |
| 3379 | 3379 | zuk z2131 | android |
| 3380 | 3380 | zuk z2151 | android |
# объединим таблицы df и index по полям 'device' и 'os' чтобы добавить поле с ключами к df
# удалять поле 'addr' не станем, т.к. оно позволит выделять разных пользователей устройств
df = (
df
.merge(index
, how = 'left'
, suffixes = ('', '_index')
, left_on = ['device', 'os']
, right_on = ['device', 'os'])
)
# переименуем столбцы 'index' в 'dev_key' и 'addr' в 'user_id'
df.rename(columns = {'index':'dev_key', 'addr':'user_id'}, inplace=True)
print('Таблица приняла следующий вид:')
df.head(3)
Таблица приняла следующий вид:
| user_id | date | serv_answer_1 | serv_answer_2 | device | os | browser | dev_key | |
|---|---|---|---|---|---|---|---|---|
| 0 | 543614941 | 2019-01-22 03:56:14 | 200 | 30577 | parse_bot | parse_bot | parse_bot | 1882 |
| 1 | 31569651 | 2019-01-22 03:56:16 | 200 | 5667 | ale-l21 | android | Chrome | 76 |
| 2 | 31569651 | 2019-01-22 03:56:16 | 200 | 5379 | ale-l21 | android | Chrome | 76 |
Выполним сохранение датафрейма в формате Parquet в ходе дальнейшей работы обращаться можно будет как к полученному файлу, так и к переменной *df*, хранящей pandas.DataFrame.
%time
# сохраним датафрейм в формате parque и откроем его
# (для проверки свойств полученного файла: скорость загрузки, форматы данных, схема, занимаемый объём памяти)
df.to_parquet(path='../morozov_ea/output_data/parsed_logs_df.parquet', index=None)
df_parquet = pd.read_parquet('../morozov_ea/output_data/parsed_logs_df.parquet')
print(df_parquet.info())
df_parquet.head(3)
CPU times: user 2 µs, sys: 0 ns, total: 2 µs Wall time: 4.77 µs <class 'pandas.core.frame.DataFrame'> Int64Index: 10350799 entries, 0 to 10350798 Data columns (total 8 columns): # Column Dtype --- ------ ----- 0 user_id int64 1 date datetime64[ns] 2 serv_answer_1 int64 3 serv_answer_2 int64 4 device object 5 os object 6 browser object 7 dev_key int64 dtypes: datetime64[ns](1), int64(4), object(3) memory usage: 710.7+ MB None
| user_id | date | serv_answer_1 | serv_answer_2 | device | os | browser | dev_key | |
|---|---|---|---|---|---|---|---|---|
| 0 | 543614941 | 2019-01-22 03:56:14 | 200 | 30577 | parse_bot | parse_bot | parse_bot | 1882 |
| 1 | 31569651 | 2019-01-22 03:56:16 | 200 | 5667 | ale-l21 | android | Chrome | 76 |
| 2 | 31569651 | 2019-01-22 03:56:16 | 200 | 5379 | ale-l21 | android | Chrome | 76 |
Ответы на вопросы задания получим в несколько этапов: как с использованием кластерных вычислений так и в локальном режиме.
Для ориентира получим ответы на вопросы задания с использованием библиотеки Pandas, затем будем использовать их для сравнения с результатами во фреймворке Spark.
Выведем сперва:
1. Суррогатный ключ устройства.
2. Название устройства.
3. Количество пользователей.
4. Доля пользователей данного устройства от общего числа пользователей.
# кол-во строк в группе, т.е. кол-во запросов с разных устройств с разным user_id
unique_users_qty = len(df.groupby(['dev_key', 'user_id']))
# датафрейм по п.1-4
df_1_4 = (
df
.groupby(['dev_key', 'device', 'user_id'])
.any()['serv_answer_2']
.reset_index()
.groupby('dev_key')
.agg({'device':'first', 'user_id':'count'})
.rename(columns={'user_id':'users_qty'})
)
df_1_4['users_ratio'] = np.round((df_1_4['users_qty'] / unique_users_qty), 5)
df_1_4
| device | users_qty | users_ratio | |
|---|---|---|---|
| dev_key | |||
| 0 | 2pq93 | 1 | 0.00000 |
| 1 | 2pyb2 | 1 | 0.00000 |
| 2 | 4035d | 2 | 0.00001 |
| 3 | 5047d | 6 | 0.00002 |
| 4 | 5049w | 1 | 0.00000 |
| ... | ... | ... | ... |
| 3376 | zte grand s ii lte | 4 | 0.00001 |
| 3377 | zuk z1 | 4 | 0.00001 |
| 3378 | zuk z2121 | 15 | 0.00005 |
| 3379 | zuk z2131 | 9 | 0.00003 |
| 3380 | zuk z2151 | 1 | 0.00000 |
3381 rows × 3 columns
Сгруппируем данные и дополним витрину информацией о (cуррогатный ключ и название устройства также остаются актуальными):
5. Количество совершенных действий для данного устройства.
6. Доля совершенных действий с данного устройства, относительно других устройств.
# кол-во строк в группе, т.е. кол-во ответов сервера с различными 'serv_answer_1' и 'serv_answer_2'
unique_answers_qty = len(df.groupby(['dev_key', 'serv_answer_1', 'serv_answer_2']))
# датафрейм по п.5-6
df_5_6 = (
df
.groupby(['dev_key', 'device', 'serv_answer_1', 'serv_answer_2'])
.any()['user_id']
.reset_index()
.groupby('dev_key')
.agg({'device':'first', 'serv_answer_2':'count'})
.rename(columns={'serv_answer_2':'answers_qty'})
)
df_5_6['answers_ratio'] = np.round((df_5_6['answers_qty'] / unique_answers_qty), 6)
df_5_6
Ещё раз сгруппируем данные и дополним витрину (cуррогатный ключ и название устройства также остаются актуальными) данными о браузерах:
7. Список из 5 самых популярных браузеров, используемых на данном устройстве различными пользователями, с указанием доли использования для данного браузера относительно остальных браузеров.
# датафрейм по п.7
df_7 = (
pd.DataFrame(
df
.groupby(['dev_key', 'device', 'browser'])
.count()['date']
)
)
# дополнения (временные таблицы) для слияния с df_7 и получения расчётных величин
df_7_br_tot = df_7.copy().reset_index().groupby('browser').agg({'date':'sum'}) #групп-ка по браузеру
df_7_br_tot['ratio_total'] = np.round(df_7_br_tot['date'] / df_7_br_tot['date'].sum(), 5) #расчёт доли от общего
df_7_br_by_dev = df_7.copy().reset_index().groupby('dev_key').sum() #групп-ка по устр-ву
#расчёт кол-ва прим-ий
# объединения таблиц
df_7 = df_7.merge(df_7_br_by_dev, how='left', left_index=True, right_index=True)
df_7['ratio_by_device'] = np.round(df_7['date_x'] / df_7['date_y'], 3)
df_7 = df_7.merge(df_7_br_tot, how='left', left_index=True, right_index=True)
# удаление лишних столбцов, переименование
df_7.drop(columns = ['date_y', 'date'], inplace = True)
df_7.rename(columns = {'date_x' : 'usage_qty'}, inplace = True)
df_7.head()
Дополним витрину (cуррогатный ключ и название устройства также остаются актуальными) данными об ответах сервера:
8. Количество ответов сервера, отличных от 200, на данном устройстве.
9. Для каждого из ответов сервера, отличных от 200, сформировать поле, в котором будет содержаться количество ответов данного типа.
# датафрейм для п.8-9 без строк с отвтеом 200
df_8_9 = (
pd.DataFrame(
df
.loc[df.loc[:, 'serv_answer_1'] != 200, :]
.groupby(['dev_key', 'device', 'serv_answer_1'])
.count()['serv_answer_2']
).rename(columns = {'serv_answer_2' : 'not200_answers_qty'})
)
# добавляем столбец с суммарным кол-вом ответов, отличных от 200 + transform для распространения на все строки
df_8_9['not200_sum_by_device'] = df_8_9.groupby(level = 'dev_key').transform('sum')
# меняем порядок следования столбцов
df_8_9 = df_8_9.reindex(columns = ['not200_sum_by_device', 'not200_answers_qty'])
df_8_9
| not200_sum_by_device | not200_answers_qty | |||
|---|---|---|---|---|
| dev_key | device | serv_answer_1 | ||
| 1 | 2pyb2 | 302 | 2 | 2 |
| 2 | 4035d | 302 | 8 | 8 |
| 3 | 5047d | 302 | 2 | 2 |
| 4 | 5049w | 301 | 2 | 1 |
| 302 | 2 | 1 | ||
| ... | ... | ... | ... | ... |
| 3377 | zuk z1 | 404 | 11 | 3 |
| 499 | 11 | 1 | ||
| 3378 | zuk z2121 | 302 | 4 | 4 |
| 3379 | zuk z2131 | 302 | 7 | 5 |
| 499 | 7 | 2 |
4794 rows × 2 columns
В целях унификации можно объединить ответы п.1-6, но датафреймы п.7 и п.8-9 получились не столь подобными, их имеет смысл рассматривать отдельно.
# витрина по пунктам 1-6 задания
df_1_6 = (
df_1_4
.merge(df_5_6
, how='left'
, left_index = True
, right_index = True
, suffixes = ('','_right'))
.drop(columns = ['device_right'])
)
df_1_6.head()
| device | users_qty | users_ratio | answers_qty | answers_ratio | |
|---|---|---|---|---|---|
| dev_key | |||||
| 0 | 2pq93 | 1 | 0.00000 | 1 | 0.000001 |
| 1 | 2pyb2 | 1 | 0.00000 | 139 | 0.000108 |
| 2 | 4035d | 2 | 0.00001 | 26 | 0.000020 |
| 3 | 5047d | 6 | 0.00002 | 83 | 0.000064 |
| 4 | 5049w | 1 | 0.00000 | 137 | 0.000106 |
# витрина по пункту 7 задания
df_7.head()
| usage_qty | ratio_by_device | ratio_total | |||
|---|---|---|---|---|---|
| dev_key | device | browser | |||
| 0 | 2pq93 | Chrome | 1 | 1.000 | 0.48032 |
| 1 | 2pyb2 | Chrome | 160 | 0.976 | 0.48032 |
| parse_bot | 4 | 0.024 | 0.15273 | ||
| 2 | 4035d | Chrome | 39 | 1.000 | 0.48032 |
| 3 | 5047d | Chrome | 84 | 0.840 | 0.48032 |
# витрина по пунктам 8-9 задания
df_8_9.head()
| not200_sum_by_device | not200_answers_qty | |||
|---|---|---|---|---|
| dev_key | device | serv_answer_1 | ||
| 1 | 2pyb2 | 302 | 2 | 2 |
| 2 | 4035d | 302 | 8 | 8 |
| 3 | 5047d | 302 | 2 | 2 |
| 4 | 5049w | 301 | 2 | 1 |
| 302 | 2 | 1 |
Для курса по инженерии данных представляется логичным воспользоваться иными инструментами data engeneer: в первую очередь инструментами стэка для работы с параллельными вычислениями и распределёнными данными, - и перенести решение на эту платформу.
Первейшим из таких инструментов выглядит использование фреймворка Apache Spark.
# создаём Spark-сессию
# (часто при первом запуске вылетал с ошибкой, вдобавок хотелось думать, что можно запуститься на кластере
# поэтому методом тыка была отработана техника для входа сначала c master("local"), а потом c "yarn")
for mode in ['local', 'yarn']:
print('mode --- ', mode)
spark = (
SparkSession
.builder
.master(mode)
.appName("spark_pandas")
.getOrCreate()
)
# проверка работоспособности
spark.sql('select "spark" as hello').show()
mode --- local 23/01/09 00:44:43 WARN Utils: Your hostname, data-analysis resolves to a loopback address: 127.0.1.1; using 146.120.224.166 instead (on interface ens160) 23/01/09 00:44:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/09 00:44:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable mode --- yarn 23/01/09 00:44:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect. +-----+ |hello| +-----+ |spark| +-----+
df = (
spark
.read
.format('parquet')
#.option("'path', '../... или file://....') # путь можно указать в .option()
.load('../morozov_ea/output_data/parsed_logs_df.parquet')
)
# выведем схему датафрейма и посмотрим тип объекта
df.printSchema()
type(df)
root |-- user_id: long (nullable = true) |-- date: timestamp (nullable = true) |-- serv_answer_1: long (nullable = true) |-- serv_answer_2: long (nullable = true) |-- device: string (nullable = true) |-- os: string (nullable = true) |-- browser: string (nullable = true) |-- dev_key: long (nullable = true) |-- __index_level_0__: long (nullable = true)
pyspark.sql.dataframe.DataFrame
# конвертируем pyspark.sql.dataframe.DataFrame в датафрейм pandas-API и изменим название индекса
pds_df = df.pandas_api(index_col='__index_level_0__')
pds_df.index.rename('index', inplace= True)
display(pds_df.head(3))
type(pds_df)
/opt/tljh/user/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:248: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead series = series.astype(t, copy=False)
| user_id | date | serv_answer_1 | serv_answer_2 | device | os | browser | dev_key | |
|---|---|---|---|---|---|---|---|---|
| index | ||||||||
| 0 | 543614941 | 2019-01-22 03:56:14 | 200 | 30577 | parse_bot | parse_bot | parse_bot | 1882 |
| 1 | 31569651 | 2019-01-22 03:56:16 | 200 | 5667 | ale-l21 | android | Chrome | 76 |
| 2 | 31569651 | 2019-01-22 03:56:16 | 200 | 5379 | ale-l21 | android | Chrome | 76 |
pyspark.pandas.frame.DataFrame
Помимо решения непосредственно методами API SPARK SQL сравнивалось решение на Pandas-API. Ниже это показано только на примере 1 задания, более развёрнутое сравнение приводится в работе непосредственно.
Выведем:
1. Суррогатный ключ устройства.
2. Название устройства.
3. Количество пользователей.
4. Доля пользователей данного устройства от общего числа пользователей.
# кол-во запросов с разных устройств с разным user_id
unique_users_qty = pds_df.groupby(['dev_key', 'user_id']).count().count()[0]
# датафрейм по п.1-4
pds_df_1_4 = (
pds_df
.groupby(['dev_key', 'device', 'user_id'])
.any()['serv_answer_2']
.reset_index()
.groupby('dev_key')
.agg({'device':'first', 'user_id':'count'})
.rename(columns={'user_id':'users_qty'})
)
pds_df_1_4['users_ratio_1'] = unique_users_qty
pds_df_1_4['users_ratio'] = pds_df_1_4['users_qty'] / pds_df_1_4['users_ratio_1']
pds_df_1_4 = pds_df_1_4.drop(columns = 'users_ratio_1')
pds_df_1_4.head(20)
/opt/tljh/user/lib/python3.9/site-packages/pyspark/pandas/internal.py:1573: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
fields = [
/opt/tljh/user/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:486: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
for column, series in pdf.iteritems():
| device | users_qty | users_ratio | |
|---|---|---|---|
| dev_key | |||
| 0 | 2pq93 | 1 | 0.000004 |
| 1 | 2pyb2 | 1 | 0.000004 |
| 2 | 4035d | 2 | 0.000007 |
| 3 | 5047d | 6 | 0.000021 |
| 4 | 5049w | 1 | 0.000004 |
| 5 | 5050 | 4 | 0.000014 |
| 6 | 5050x | 1 | 0.000004 |
| 7 | 5095k | 1 | 0.000004 |
| 8 | 6043d | 4 | 0.000014 |
| 9 | 6p | 16 | 0.000057 |
| 10 | 702so | 1 | 0.000004 |
| 11 | 706 | 3 | 0.000011 |
| 12 | 708g | 1 | 0.000004 |
| 13 | 709a | 1 | 0.000004 |
| 14 | 7_plus | 2 | 0.000007 |
| 15 | 8030y | 1 | 0.000004 |
| 16 | 8_plus | 2 | 0.000007 |
| 17 | 9002x | 2 | 0.000007 |
| 18 | 9003x | 3 | 0.000011 |
| 19 | 9007a | 6 | 0.000021 |
С некоторыми корректировками, вызванными неполным переносом всех методов в API Pandas из родной библиотеки, задача принципиально была решена в том же виде, что и в нативном pandas с повторением всех процедур. Тот же результат получается с использованием spark.sql методов (ниже).
# в переменной сохраним число записей в сгрупированном объекте
unique_users_qty_s = df.groupBy('dev_key', 'device', 'user_id').count().count()
# датафрейм по п.1-4
s_df_1_4 = (
df
.groupBy('dev_key', 'device', 'user_id')
.count()
.select('dev_key', 'device', 'user_id')
.groupBy('dev_key', 'device')
.agg({'user_id' : 'count'})
.toDF('dev_key', 'device', 'users_qty')
.select('dev_key', 'device', 'users_qty')
.withColumn('users_ratio', f.round(f.col('users_qty') / unique_users_qty_s, 5))
.orderBy(f.asc('dev_key'))
)
s_df_1_4.show(20, truncate=False)
[Stage 22:> (0 + 1) / 1]
+-------+------+---------+-----------+ |dev_key|device|users_qty|users_ratio| +-------+------+---------+-----------+ |0 |2pq93 |1 |0.0 | |1 |2pyb2 |1 |0.0 | |2 |4035d |2 |1.0E-5 | |3 |5047d |6 |2.0E-5 | |4 |5049w |1 |0.0 | |5 |5050 |4 |1.0E-5 | |6 |5050x |1 |0.0 | |7 |5095k |1 |0.0 | |8 |6043d |4 |1.0E-5 | |9 |6p |16 |6.0E-5 | |10 |702so |1 |0.0 | |11 |706 |3 |1.0E-5 | |12 |708g |1 |0.0 | |13 |709a |1 |0.0 | |14 |7_plus|2 |1.0E-5 | |15 |8030y |1 |0.0 | |16 |8_plus|2 |1.0E-5 | |17 |9002x |2 |1.0E-5 | |18 |9003x |3 |1.0E-5 | |19 |9007a |6 |2.0E-5 | +-------+------+---------+-----------+ only showing top 20 rows
Стоит отметить, что пользователь, запустив Spark, получает "из коробки" возможность работы на API Pandas, SQL а также pySpark, что по широте возможностей, кажется, делает инструмент мощным и довольно безальтернативным.
Сгруппируем данные и дополним витрину информацией о:
5. Количество совершенных действий для данного устройства.
6. Доля совершенных действий с данного устройства, относительно других устройств.
.........
# в переменной сохраним число записей в сгрупированном объекте
unique_answers_qty_s = df.groupBy('dev_key', 'serv_answer_1', 'serv_answer_2').count().count()
# датафрейм по п.5-6
s_df_5_6 = (
df
.groupBy('dev_key', 'device', 'serv_answer_1', 'serv_answer_2')
.count()
.select('dev_key', 'device', 'serv_answer_1', 'serv_answer_2')
#.orderBy(f.asc('dev_key'), f.asc('serv_answer_1'), f.asc('serv_answer_2'))
.groupBy('dev_key', 'device')
.agg({'serv_answer_2' : 'count'})
.toDF('dev_key', 'device', 'actions_qty')
.withColumn('actions_ratio_by_device', f.round(f.col('actions_qty') / unique_answers_qty_s, 10))
.orderBy(f.asc('dev_key'))
)
s_df_5_6.show(10, truncate=False)
[Stage 95:> (0 + 1) / 1]
+-------+------+-----------+-----------------------+ |dev_key|device|actions_qty|actions_ratio_by_device| +-------+------+-----------+-----------------------+ |0 |2pq93 |1 |7.771E-7 | |1 |2pyb2 |139 |1.08016E-4 | |2 |4035d |26 |2.02044E-5 | |3 |5047d |83 |6.44988E-5 | |4 |5049w |137 |1.064618E-4 | |5 |5050 |43 |3.3415E-5 | |6 |5050x |3 |2.3313E-6 | |7 |5095k |18 |1.39877E-5 | |8 |6043d |58 |4.50714E-5 | |9 |6p |145 |1.126786E-4 | +-------+------+-----------+-----------------------+ only showing top 10 rows
Решение получилось идентичным с результатом работы на Pandas. Продолжим выполнение оставшихся заданий с использованием Spark.
Ещё раз сгруппируем данные и дополним витрину данными о браузерах:
7. Список из 5 самых популярных браузеров, используемых на данном устройстве различными пользователями, с указанием доли использования для данного браузера относительно остальных браузеров.
.........
# датафрейм по п.7
# первичная группировка и преобразование ps.Series к ps.DataFrame (для удобства)
s_df_7 = (
df
.groupby(['dev_key', 'device', 'browser'])
.count()
.orderBy(f.col('dev_key').asc())
)
# дополнения (временные таблицы) для слияния с df_7 и получения расчётных величин
s_df_7_br_by_dev = ( # группир-ка по устр-ву
s_df_7 # расчёт кол-ва применений
.groupby(['dev_key','device'])
.sum('count')
.toDF('dev_key', 'device', 'sum')
.orderBy('dev_key')
)
# объединение таблиц
s_df_7 = (
s_df_7
.join(s_df_7_br_by_dev, on=['dev_key', 'device'], how='left')
.orderBy(f.col('dev_key').asc())
)
# создание новой колонки со значениями
s_df_7 = s_df_7.withColumn('ratio_by_device', f.col('count')/f.col('sum'))
s_df_7_br_tot = ( # группир-ка по браузеру
df # расчёт доли от общего
.groupby(['browser'])
.count()
)
# добавление колонки с долей от общего кол-ва
tot_count = s_df_7_br_tot.select(f.sum('count')).collect()
s_df_7_br_tot = (
s_df_7_br_tot
.withColumn('ratio_total', f.col('count')/tot_count[0][0])
.select(['browser', 'ratio_total'])
)
# объединение таблиц
s_df_7 = (
s_df_7
.join(s_df_7_br_tot, on=['browser'], how='left')
.select(['dev_key', 'device', 'browser', 'count', 'ratio_by_device', 'ratio_total'])
.orderBy(f.col('dev_key').asc(), f.col('count').desc())
)
s_df_7.show(10)
+-------+------+---------+-----+--------------------+-------------------+ |dev_key|device| browser|count| ratio_by_device| ratio_total| +-------+------+---------+-----+--------------------+-------------------+ | 0| 2pq93| Chrome| 1| 1.0|0.48032311322053495| | 1| 2pyb2| Chrome| 160| 0.975609756097561|0.48032311322053495| | 1| 2pyb2|parse_bot| 4|0.024390243902439025|0.15272705034654813| | 2| 4035d| Chrome| 39| 1.0|0.48032311322053495| | 3| 5047d| Chrome| 84| 0.84|0.48032311322053495| | 3| 5047d|parse_bot| 16| 0.16|0.15272705034654813| | 4| 5049w| Chrome| 149| 1.0|0.48032311322053495| | 5| 5050| Chrome| 40| 0.9302325581395349|0.48032311322053495| | 5| 5050|parse_bot| 3| 0.06976744186046512|0.15272705034654813| | 6| 5050x|parse_bot| 3| 1.0|0.15272705034654813| +-------+------+---------+-----+--------------------+-------------------+ only showing top 10 rows
# датафрейм для п.8-9 без строк с отвтеом 200
pds_df_8_9 = (
ps.DataFrame(
pds_df
.loc[pds_df.loc[:, 'serv_answer_1'] != 200, :]
.groupby(['dev_key', 'device', 'serv_answer_1'])
.count()['serv_answer_2']
).rename(columns = {'serv_answer_2' : 'not200_answers_qty'})
.sort_index()
)
# добавляем столбец с суммарным кол-вом ответов, отличных от 200
# метод .transform('sum') не поддерживает тот же функционал, что в pandas, надёжнее было объединить с пом. join
sum_by_device = (
pds_df_8_9
.reset_index()
.groupby('dev_key')
.sum()
.sort_index()
)
pds_df_8_9 = (
pds_df_8_9
.reset_index()
.set_index('dev_key')
.merge(sum_by_device, how='left', left_index=True, right_index=True)
)
# меняем порядок следования столбцов, удаляем повторяющиеся, переименовываем
pds_df_8_9 = pds_df_8_9.drop(['serv_answer_1_y'], axis = 1)
pds_df_8_9 = (
pds_df_8_9
.rename(columns = {'serv_answer_1_x':'serv_answer_1', 'not200_answers_qty_y':'not200_sum_by_device'
, 'not200_answers_qty_x':'not200_answers_qty'})
.reset_index()
.set_index(['dev_key', 'device', 'serv_answer_1'])
.reindex(columns=['not200_sum_by_device', 'not200_answers_qty'])
)
pds_df_8_9.head(10)
| not200_sum_by_device | not200_answers_qty | |||
|---|---|---|---|---|
| dev_key | device | serv_answer_1 | ||
| 1 | 2pyb2 | 302 | 2 | 2 |
| 2 | 4035d | 302 | 8 | 8 |
| 3 | 5047d | 302 | 2 | 2 |
| 4 | 5049w | 301 | 2 | 1 |
| 302 | 2 | 1 | ||
| 5 | 5050 | 302 | 1 | 1 |
| 7 | 5095k | 302 | 1 | 1 |
| 9 | 6p | 302 | 8 | 1 |
| 304 | 8 | 7 | ||
| 12 | 708g | 302 | 1 | 1 |
Для экономии времени в этом месте вызовем метод .to_spark к полученному выше датафрейму pds_df_8_9. Он позволит получить из pyspark.pandas.frame.DataFrame объект pyspark.sql.dataframe.DataFrame без доп. преобразований.
s_df_8_9 = pds_df_8_9.to_spark(index_col=['dev_key', 'device', 'serv_answer_1'])
s_df_8_9.show()
+-------+--------+-------------+--------------------+------------------+ |dev_key| device|serv_answer_1|not200_sum_by_device|not200_answers_qty| +-------+--------+-------------+--------------------+------------------+ | 1| 2pyb2| 302| 2| 2| | 2| 4035d| 302| 8| 8| | 3| 5047d| 302| 2| 2| | 4| 5049w| 301| 2| 1| | 4| 5049w| 302| 2| 1| | 5| 5050| 302| 1| 1| | 7| 5095k| 302| 1| 1| | 9| 6p| 302| 8| 1| | 9| 6p| 304| 8| 7| | 12| 708g| 302| 1| 1| | 17| 9002x| 302| 1| 1| | 19| 9007a| 302| 2| 2| | 20| a 3g| 499| 1| 1| | 21| a0001| 304| 2| 1| | 21| a0001| 499| 2| 1| | 23| a1-713| 302| 2| 1| | 23| a1-713| 404| 2| 1| | 24|a1-713hd| 301| 32| 2| | 24|a1-713hd| 302| 32| 17| | 24|a1-713hd| 304| 32| 11| +-------+--------+-------------+--------------------+------------------+ only showing top 20 rows
По-прежнему в целях унификации можно объединить витрины по пунктам 1-6, датафреймы п.7 и п.8-9 получились разнородными, их имеет смысл рассматривать отдельно.
# объединим витрины п.1-4 и 5-6
s_df_1_6 = (
s_df_1_4
.join(s_df_5_6, how='left', on=['dev_key', 'device'])
.toDF('dev_key', 'device', 'users_qty', 'users_ratio', 'actions_qty', 'actions_ratio_by_device')
)
print('Витрина данных по п.1-6 (фрагмент)')
s_df_1_6.show(10)
Витрина данных по п.1-6 (фрагмент)
[Stage 183:> (0 + 1) / 1]
+-------+------+---------+-----------+-----------+-----------------------+ |dev_key|device|users_qty|users_ratio|actions_qty|actions_ratio_by_device| +-------+------+---------+-----------+-----------+-----------------------+ | 0| 2pq93| 1| 0.0| 1| 7.771E-7| | 1| 2pyb2| 1| 0.0| 139| 1.08016E-4| | 2| 4035d| 2| 1.0E-5| 26| 2.02044E-5| | 3| 5047d| 6| 2.0E-5| 83| 6.44988E-5| | 4| 5049w| 1| 0.0| 137| 1.064618E-4| | 5| 5050| 4| 1.0E-5| 43| 3.3415E-5| | 6| 5050x| 1| 0.0| 3| 2.3313E-6| | 7| 5095k| 1| 0.0| 18| 1.39877E-5| | 8| 6043d| 4| 1.0E-5| 58| 4.50714E-5| | 9| 6p| 16| 6.0E-5| 145| 1.126786E-4| +-------+------+---------+-----------+-----------+-----------------------+ only showing top 10 rows
# отобразим витрину по п.7
print('Витрина данных по п.7 (фрагмент)')
s_df_7.show(10)
# отобразим витрину по п.8-9
print('Витрина данных по п.8-9 (фрагмент)')
s_df_8_9.show(10)
Витрина данных по п.7 (фрагмент)
+-------+------+---------+-----+--------------------+-------------------+ |dev_key|device| browser|count| ratio_by_device| ratio_total| +-------+------+---------+-----+--------------------+-------------------+ | 0| 2pq93| Chrome| 1| 1.0|0.48032311322053495| | 1| 2pyb2| Chrome| 160| 0.975609756097561|0.48032311322053495| | 1| 2pyb2|parse_bot| 4|0.024390243902439025|0.15272705034654813| | 2| 4035d| Chrome| 39| 1.0|0.48032311322053495| | 3| 5047d| Chrome| 84| 0.84|0.48032311322053495| | 3| 5047d|parse_bot| 16| 0.16|0.15272705034654813| | 4| 5049w| Chrome| 149| 1.0|0.48032311322053495| | 5| 5050| Chrome| 40| 0.9302325581395349|0.48032311322053495| | 5| 5050|parse_bot| 3| 0.06976744186046512|0.15272705034654813| | 6| 5050x|parse_bot| 3| 1.0|0.15272705034654813| +-------+------+---------+-----+--------------------+-------------------+ only showing top 10 rows Витрина данных по п.8-9 (фрагмент)
+-------+------+-------------+--------------------+------------------+ |dev_key|device|serv_answer_1|not200_sum_by_device|not200_answers_qty| +-------+------+-------------+--------------------+------------------+ | 1| 2pyb2| 302| 2| 2| | 2| 4035d| 302| 8| 8| | 3| 5047d| 302| 2| 2| | 4| 5049w| 301| 2| 1| | 4| 5049w| 302| 2| 1| | 5| 5050| 302| 1| 1| | 7| 5095k| 302| 1| 1| | 9| 6p| 302| 8| 1| | 9| 6p| 304| 8| 7| | 12| 708g| 302| 1| 1| +-------+------+-------------+--------------------+------------------+ only showing top 10 rows
Результат всех способов преобразования получается одинаковый, в этом смысле разницы, какой из них выбирать - нет. Попробуем на данном спектре вариантов решения разобраться с трансляцией полученного результата в базы данных, а также с записью и хранением в Hive и HDFS.
чтение из PostgreSQL
Чтение из базы данных с помощью Spark представляется простым, т.к. в метод .read передается минимум параметров.
# последовательность вызовов на чтение из БД
(
spark
.read
.format('jdbc')
.option('url', 'jdbc:postgresql://146.120.224.166:5432/morozov_evgeny_db')
.option('dbtable', 'db')
.option('user', 'morozov_evgeny')
.option('password', 'w*******')
.load()
)
Код в работоспособное состояние привести не удалось, последовательность выше (как впрочем и ниже при попытке записи в БД) приводила к ошибкам.
запись в PostgreSQL
Запись в базу данных с помощью Spark также не представляется сложной, в метод .write передаются соотв. аргументы для подключения и записи, нужно только учитывать, что метод .write.format('jdbc')... у датафреймов pandas API отсутствует и существует только для датафреймов pyspark.sql.dataframe.DataFrame.
В данной работе чтение и запись в итоге произведены не были, поскольку на сервере либо не были установлены драйверов БД, либо их местоположение осталось мне неизвестно.
# последовательность вызовов для записи в ДБ
(
s_df_1_6
#.repartition(3) # просто для напомнинаия о партиционировании при записи в HDFS (к БД не имеет отношения)
.write
.format('jdbc')
.mode('overwrite')
.option('url', 'jdbc:postgresql://localhost:5432/morozov_evgeny_db')
#.option('url', 'jdbc:postgresql://146.120.224.166:5432/morozov_evgeny_db')
.option('dbtable', 'case_1_6')
.option('user', 'morozov_evgeny')
.option('password', 'w*******')
.save()
)
запись и чтение в реляционное хранилище Hive
Запись в БД Hive не требует установки драйверов и похожа на запись в простой файл, с той разницей, что вместо метода .save() вызывается метод .saveAsTable().
# команды отрабатывали только с методом .save() на конце, но файл "улетал" в неизвестном направлении, видимо.
# в HUE найден не был, сам HUE тоже был доступен не каждый раз. Поэтому последовательность команд перенес в
# markdown-ячейку.
(
s_df_1_4
.repartition(3) # партиционирование при записи в HDFS
.write
.format('orc')
#.partitionBy('device')
.mode('overwrite')
#.option('compression','gzip')
.save('hdfs://localhost:9099/user/morozov_evgeny/')
)
! hdfs dfs -ls -R hdfs://localhost:9099/user/morozov_evgeny/
/bin/bash: hdfs: command not found
Изначально казалось, что в данной сессии есть возможность писать в HDFS, но по всей видимости работа в 'local'-режиме не предусматривает такой возможности...
Отрицательный опыт - тоже опыт, удалять эту часть работы не станем, поскольку есть определённый потенциал к её решению и он может быть развит. Поскольку начинания успехом не увенчались, то используем метод, зарекомендовавший себя как более отлаженный.
Ещё ранее в папке с запущенным проектом Spark создал директорию ./spark-warehouse для хранения метаданных таблиц, в ней же Hive будет по умолчанию хранить созданные таблицы.
Запишем получившиеся витрины и продублируем итоги в папку "output_data", поскольку способы выше не дали желаемых результатов, такой способ также можно считать завершением этапа LOAD и выполнением работы.
# вручную пропишем последовательность вызовов для записи в Hive
(
s_df_1_6
.repartition(3) # оставил для напомнинаия о партиционировании при записи
.write
.format('parquet')
#.partitionBy('device')
.mode('overwrite')
#.option('compression','gzip')
.saveAsTable('case_1_6_parquet')
)
(
s_df_7
.repartition(3)
.write
.format('parquet')
#.partitionBy('device')
.mode('overwrite')
#.option('compression','gzip')
.saveAsTable('case_7_parquet')
)
(
s_df_8_9
.repartition(3)
.write
.format('parquet')
#.partitionBy('device')
.mode('overwrite')
#.option('compression','gzip')
.saveAsTable('case_8_9_parquet')
)
Заглянем в директорию spark-warehouse, там можно найти созданные папки вида case_*_parquet с партиционированными файлами витрин и файлом-флагом _SUCCESS, которые говорят об успешности записи бинарных файлов.
! ls -lh spark-warehouse/
total 12K drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 9 00:46 case_1_6_parquet drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 9 00:46 case_7_parquet drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 9 00:46 case_8_9_parquet
Для открытия файла можно использовать более быстрый (чем .read.format().option().load()) метод spark.sql() с соотв. запросом к указанной таблице.
# в языке Hive_QL есть команды для отображения таблиц БД
spark.sql('show tables').show()
+---------+----------------+-----------+ |namespace| tableName|isTemporary| +---------+----------------+-----------+ | default|case_1_6_parquet| false| | default| case_7_parquet| false| | default|case_8_9_parquet| false| | | 1_4| true| +---------+----------------+-----------+
# обратимся к Hive для загрузки
hivedf = spark.sql('SELECT * FROM case_1_6_parquet')
hivedf.show(10)
hivedf.printSchema()
print('Количество строк case_1_6_parquet =', hivedf.count())
+-------+--------------------+---------+-----------+-----------+-----------------------+ |dev_key| device|users_qty|users_ratio|actions_qty|actions_ratio_by_device| +-------+--------------------+---------+-----------+-----------+-----------------------+ | 2181| s4700| 24| 9.0E-5| 153| 1.188953E-4| | 3113| sph-l720t| 2| 1.0E-5| 69| 5.36195E-5| | 1887| pc704| 2| 1.0E-5| 24| 1.86503E-5| | 3249| vsun h3| 1| 0.0| 83| 6.44988E-5| | 3188| trt-lx1| 1| 0.0| 1| 7.771E-7| | 2074|redmi note 4 miui...| 1| 0.0| 5| 3.8855E-6| | 2583| selfie| 19| 7.0E-5| 93| 7.22697E-5| | 560| gt-s5670| 4| 1.0E-5| 13| 1.01022E-5| | 2221| samsung gt-s7270| 1| 0.0| 12| 9.3251E-6| | 2999| sm-t116nu| 8| 3.0E-5| 148| 1.150099E-4| +-------+--------------------+---------+-----------+-----------+-----------------------+ only showing top 10 rows root |-- dev_key: long (nullable = true) |-- device: string (nullable = true) |-- users_qty: long (nullable = true) |-- users_ratio: double (nullable = true) |-- actions_qty: long (nullable = true) |-- actions_ratio_by_device: double (nullable = true) Количество строк case_1_6_parquet = 3381
# в качестве иллюстрации
print('Работа завершена, останавливаем сессию.')
spark.stop()
Работа завершена, останавливаем сессию.
print('Полученные витрины данных и датафрейм с распарсенными логами в директории',
'\033[1m' + 'output_data' + '\033[0m:')
! ls -lh ./output_data/
Полученные витрины данных и датафрейм с распарсенными логами в директории output_data:
total 112M
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 8 01:42 case_1_6_parquet
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 8 01:42 case_7_parquet
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan 8 01:42 case_8_9_parquet
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 112M Jan 9 00:42 parsed_logs_df.parquet
Была предпринята попытка реализации логики обработки данных, условно принятых за "большие", последовательное формулирование целей работы и подбор инструментов для их реализаци заняли значительное время, но результат всё-таки был получен.
Таким образом была отработана возможность переноса методов Pandas в мир Spark и сделано их сопоставление как представителей разных "парадигм" вычислений: мира последовательных (хоть и в немалой степени оптимизированных) вычислений и распределенных/отложеных вычислений.Проведено натурное исследование взаимодействия программных платформ для обработки "больших данных".
В настоящее время, с включением Pandas API в состав low-level API Spark, количество способов обработки только возросло, но взаимодействие между библиотеками оказалось удивиельно "гибким" и "прозрачным". На скорости исполнения кода такие нововведения практически не сказываются, на мой взгляд, т.к. все оболочки (от Scala-shell и Python-shell до API SQL и API Pandas) в конечном счёте вызывают одни и теже методы движка Spark для работы с RDD, предварительно подвергаясь оптимизациям, что должно практически уравнивать методы.
Примененный способ показался довольно лаконичным (если не рассматривать исследовательскую составляющую), вариативным и включающим большую часть привитых в курсе навыков. Полученный опыт позволит применять навыки в трудовой деятельности без затруднений и, надеюсь, в скором времени.